package com.atguigu.gmall.realtime.test;

import com.atguigu.gmall.realtime.common.base.BaseApp;
import com.atguigu.gmall.realtime.common.constant.Constant;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import org.apache.doris.flink.source.DorisSource;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;
import java.util.Properties;

public class Realtime05_DorisConnectorStream extends BaseApp {
    public static void main(String[] args) {
        new Realtime05_DorisConnectorStream().start(5678, 1, "Realtime05_Doris_Connector_Stream", Constant.TOPIC_LOG);
    }

    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
        // 创建DorisSource

        DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder()
                .setDorisOptions(
                        DorisOptions.builder()
                                .setFenodes("hadoop102:7030")
                                .setTableIdentifier("test.table4")
                                .setUsername("root")
                                .setPassword("aaaaaa")
                                .build()
                )
                .setDorisReadOptions(DorisReadOptions.builder().build())
                .setDeserializer(new SimpleListDeserializationSchema())
                .build();

        DataStreamSource<List<?>> dorisDs = env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source");
        dorisDs.print();


        // 写入Doris

        DataStreamSource<String> jsonDs = env.fromElements(
                /*"{\"siteid\":1, \"citycode\":1, \"username\":\"zhangsan\" , \"pv\":1000}",
                "{\"siteid\":2, \"citycode\":2, \"username\":\"lisi\" , \"pv\":2000}",
                "{\"siteid\":3, \"citycode\":3, \"username\":\"wangwu\" , \"pv\":3000}"*/
                "{\"ar\":\"30\",\"ch\":\"Appstore\",\"cur_date\":\"2024-08-09\",\"dur_sum\":129303,\"edt\":\"2024-08-09 15:18:30\",\"is_new\":\"1\",\"pv_ct\":9,\"stt\":\"2024-08-09 15:18:20\",\"sv_ct\":1,\"uv_ct\":0,\"vc\":\"v2.1.134\"}"
        );

        // enable checkpoint
        env.enableCheckpointing(10000);
        // using batch mode for bounded data
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        Properties properties = new Properties();
        // 上游是 json 写入时，需要开启配置
        properties.setProperty("format", "json");
        properties.setProperty("read_json_by_line", "true");


        DorisSink<String> dorisSink = DorisSink.<String>builder()
                .setDorisReadOptions(DorisReadOptions.builder().build())
                .setDorisExecutionOptions(DorisExecutionOptions.builder()
                        // .enable2PC()
                        .setLabelPrefix("label-doris") //streamload label prefix
                        .setDeletable(false)
                        .setStreamLoadProp(properties)
                        .build())
                .setSerializer(
                        // JsonDebeziumSchemaSerializer.builder().build()
                        new SimpleStringSerializer()
                ) //serialize according to string
                .setDorisOptions(DorisOptions.builder()
                        .setFenodes("hadoop102:7030")
                        .setTableIdentifier("gmall2024_realtime.dws_traffic_vc_ch_ar_is_new_page_view_window")
                        .setUsername("root")
                        .setPassword("aaaaaa")
                        .build())
                .build();

        // 写入
        jsonDs.sinkTo(dorisSink);


    }
}
